package com.bootdo.rabbitMQ.consumer;

import com.bootdo.web.service.SendSMSService;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.utils.SerializationUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;

import java.util.Map;


/**
 * @Author wukq
 * @Date: 2020/4/13
 * @Description: 用户消息推送, 一个消息只能被一个用户接受
 */
@Configuration
@EnableRabbit
public class RabbitFanoutConsumer1 implements RabbitListenerConfigurer {

    private static final Logger logger = LoggerFactory.getLogger(RabbitFanoutConsumer1.class);

    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private SendSMSService sendSMSService;

    @Bean
    public DefaultMessageHandlerMethodFactory queueFanoutPushFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(new MappingJackson2MessageConverter());
        return factory;
    }

    /**
     * 匹配queueSmsPush 这个路由，消费交换机传进这个queueSmsPush路由传入队列的消息
     *
     * @Qualifier 的意思是合格者，通过这个标示，表明了哪个实现类才是我们所需要的 queueFanout1是方法名(方法用了@Bean实例化了)，RabbitMqFanoutConfig中的
     * @Qualifier注解的用处： 当一个接口有多个实现的时候，为了指名具体调用哪个类的实现。
     * <p>
     * 在使用Spring集成RabbitMQ时，需要监听RabbiMQ消息。一般选择性实现下面俩接口：
     * MessageListener   只能获取到Message信息
     * ChannelAwareMessageListener  能获取到Message以及Channel信息
     * <p>
     * 如何保证消息不丢失
     * 保证消息的不丢失
     * 1：Exchange需要持久化
     * 2：Queue需要持久化
     * 3：Message需要持久化
     * <p>
     * Message详解
     * 消息。服务器和应用程序之间传送的数据，本质上就是一段数据，由Properties和Payload（ Body ）组成。queueFanout1
     * @Qulifier方法中的queueFanout1 是方法名，queueFanout1方法实例化了RabbitConstant.QUEUE_FANOUT_TEST 队列，方法用了@Bean实例化了。
     * @Qualifier("bean的名字")
     */
    @Bean
    public SimpleMessageListenerContainer queueFanoutPushContainer(@Qualifier("queueFanout1") Queue queue) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue);
        container.setMaxConcurrentConsumers(1);
        container.setDefaultRequeueRejected(true);//是否重回队列
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//手动确认模式,消费者的确认模式需要配置为manual
        //为了在Spring-amqp框架中进行手工确认，在接收消息时需要实现如下的接口

        container.setMessageListener(new ChannelAwareMessageListener() {
            int i = 0;

            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                byte[] body = message.getBody();
                String bodyString = new String(body, "UTF-8");
                logger.info("rabbitmq RabbitFanoutConsumer1 接收到短信推送的消息体 :" + message.toString());
                /*
                 * 接收到短信推送的消息体 :(Body:'{createTime=2020-04-16 12:34:11, phone=123456798,
                 * messageId=763713cd-82bc-4669-92a5-3e66b43b14b3, content=test, messageData=message: testFanoutMessage }'
                 * MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0,
                 * receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=fanout_exchange_wukq,
                 * receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-9iI2lNsDq7sGwrbtVSZ6CQ,
                 * consumerQueue=queue_fanout_test])
                 * */
                try {//如果生产者传的是Map对象类的，这样解析 SerializationUtils.deserialize
//                    String s = message.getBody().toString();
//                    JSONObject jsonObject = JSONObject.parseObject(s);
                    Map<String, Object> map = (Map<String, Object>) SerializationUtils.deserialize(message.getBody());
                    logger.info("rabbitmq RabbitFanoutConsumer1 接收到短信推送的消息体 :" + map.get("phone") + map.get("content"));

                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//确认消息成功
                } catch (Exception e) {
                    logger.error("exception--->", e);
                    i++;
                    if (i > 10) {
                        logger.info("<------------- queue_fanout_test : [" + message.toString() + "], 消息出错,失败次数" + i + ",  跳过发送此消息, 进入下一条--------------->");
                        i = 0;
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//确认消息成功消费
                        //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); //重新发送
                    }
                    Thread.sleep(30000 * i);
                    logger.error("消息推送记录出错：" + e.getMessage() + "/r/n重新发送,次数:" + i);
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); //重新发送
                }

            }
        });
        return container;
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
        rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(queueFanoutPushFactory());
    }
}
